package spark.pipeline.FilmRecommend

import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.ml.recommendation.{ALS, ALSModel}
import org.apache.spark.sql.functions.{col, split}
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType}

import java.util.Properties

object UserRecommend {
    def main(args:Array[String]): Unit ={
        Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) //  启动这个就不会出现INFO列表,看起来整洁,但是就是可能什么都没有会有空虚感
        val spark = SparkSession.builder().appName("UserRecommend").master("local").getOrCreate()
        val sc = spark.sparkContext
        import spark.implicits._
        val file = "C:/Users/Lenovo/Desktop/Working/Python/data/ml-1m/ratings.data"
        val rddFile = sc.textFile(file)
        val rddRow = rddFile.flatMap(_.split("\n"))
          .map(_.split("::"))
          .map(x =>Row(x(0).toInt,x(1).toInt,x(2).toDouble,x(3).toLong))
//          .toDF()
        println("一共有:"+rddRow.count()+"项评分")
        val schema = StructType(Array(
            StructField("userID", IntegerType),
            StructField("movieID", IntegerType),
            StructField("rating", DoubleType),
            StructField("timeStamp",LongType)
        ))
        val orgDF = spark.createDataFrame(rddRow,schema)
        orgDF.show(5,false)
        val Array(trainData,testData) = orgDF.randomSplit(Array(0.7,0.3))

        val timeOld = System.currentTimeMillis()
        //建立显性反馈模型,默认显性 最大迭代次数5,正则化参数0.01, 默认0.1(>=0)
        val alsExplicit = new ALS()
          .setMaxIter(5)
          .setRegParam(0.01)
          .setUserCol("userID")
          .setItemCol("movieID")
          .setRatingCol("rating")

        //建立隐形反馈模型
        val alsImplicit = new ALS()
          .setMaxIter(5)
          .setRegParam(0.01)
          .setImplicitPrefs(true)
          .setUserCol("userID")
          .setItemCol("movieID")
          .setRatingCol("rating")

        val modelExplicit = alsExplicit.fit(trainData)
//        val modelImplicit = alsImplicit.fit(trainData)
        println("ALS时间花费为:" + (System.currentTimeMillis() - timeOld) / 1000 + "s")

/*
        val predictionExplicit = modelExplicit.transform(testData).na.drop()
//        val predictionImplicit = modelImplicit.transform(testData).na.drop()

        val evaluator = new RegressionEvaluator()
          .setMetricName("rmse")
          .setLabelCol("rating")
          .setPredictionCol("prediction")
        println("显性ALS的均方差为:"+evaluator.evaluate(predictionExplicit))
//        println("隐形ALS的均方差为:"+evaluator.evaluate(predictionImplicit))
        predictionExplicit.show(10,false)
//        predictionImplicit.show(10,false)
*/
        //为用户推荐电影Top n
        val a = modelExplicit.recommendForAllUsers(5)
//        val b = modelExplicit.recommendForAllItems(5)
        a.show(false)
        val splitDF = a.select(col("userID"),col("recommendations").getItem(0).as("1"),
            col("recommendations").getItem(1).as("2"),
            col("recommendations").getItem(2).as("3"),
            col("recommendations").getItem(3).as("4"),
            col("recommendations").getItem(4).as("5"))

        println(splitDF.printSchema)

        val userRecommendDF = splitDF
          .withColumn("Top1",$"1"("movieID"))
          .withColumn("Top2",$"2"("movieID"))
          .withColumn("Top3",$"3"("movieID"))
          .withColumn("Top4",$"4"("movieID"))
          .withColumn("Top5",$"5"("movieID"))
          .select("userID","Top1","Top2","Top3","Top4","Top5")
        userRecommendDF.show(5,false)
        val prop = new Properties()
        prop.put("user", "root")
        prop.put("password", "******")
        prop.put("driver","com.mysql.jdbc.Driver")
        val url = "jdbc:mysql://localhost:3306/python_db"
        println("开始写入数据库")
        userRecommendDF.write.mode("overwrite").jdbc(url,"userRecommendMovieID",prop)
        println("完成写入数据库")
    }
}
